Creational Patterns
Before reading any further, predict what this prints:
class DatabaseConnection:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
import threading
results = []
def create_connection():
conn = DatabaseConnection()
results.append(id(conn))
threads = [threading.Thread(target=create_connection) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
print(len(set(results))) # How many unique object IDs?
Most engineers answer 1. The correct answer is anywhere from 1 to 100, and it changes between runs.
This code has a race condition. Between the if cls._instance is None check and the cls._instance = super().__new__(cls) assignment, multiple threads can observe _instance as None simultaneously and each create a new instance. The classic Singleton implementation you see in every Python tutorial is broken under concurrency.
This lesson shows you how to fix it - and how to think about all five GoF creational patterns correctly in Python.
What You Will Learn
- Why object creation coupling is a production problem, not a theoretical one
- Singleton: thread-safe implementation, metaclass approach, Borg pattern, and the Pythonic module-level alternative
- Factory Method: abstract creator pattern with ABC, real file-format parser example
- Abstract Factory: families of related objects, cloud provider and UI theme examples
- Builder: fluent API construction, ML pipeline config example, dataclass complement
- Prototype:
copy.copyvscopy.deepcopy, custom__copy__and__deepcopy__, neural network config cloning - Python-specific registry patterns using
functools.lru_cacheand class decorators - When to use each pattern and when a simpler Python idiom suffices
Prerequisites
- Python classes,
__init__,__new__,__init_subclass__ abc.ABCand@abstractmethod- Python
threadingmodule basics - Dataclasses (
@dataclass,field()) - Python
copymodule
Part 1 - Singleton
The Problem: Uncontrolled Multiple Instances
# PROBLEM: Multiple database connection pools created accidentally
class DatabasePool:
def __init__(self):
print("Creating new pool - opening 10 connections...")
self.connections = [self._open_connection() for _ in range(10)]
def _open_connection(self):
return {"id": id(self), "status": "open"}
def get_connection(self):
return self.connections[0]
# Repository A
class UserRepository:
def __init__(self):
self.pool = DatabasePool() # Creates 10 connections
# Repository B
class OrderRepository:
def __init__(self):
self.pool = DatabasePool() # Creates ANOTHER 10 connections
# At startup:
user_repo = UserRepository() # pool 1: 10 connections
order_repo = OrderRepository() # pool 2: another 10 connections
# You now have 20 connections open when you need 10
# Worse: pool 1 and pool 2 have different connection health states
The Singleton pattern ensures a class has exactly one instance and provides a global access point to it. It is the right tool for: connection pools, configuration objects, loggers, caches, and any resource that is expensive to create and should be shared.
Solution 1: __new__-Based Singleton With Thread Safety
import threading
from typing import Any
class Singleton:
"""Thread-safe Singleton base class using __new__ and double-checked locking."""
_instance: "Singleton | None" = None
_lock: threading.Lock = threading.Lock()
def __new__(cls, *args: Any, **kwargs: Any) -> "Singleton":
# First check without lock - fast path for already-created instance
if cls._instance is None:
with cls._lock:
# Second check with lock - prevents race condition
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
The double-checked locking pattern is critical. Without the second check inside the lock, consider this sequence with two threads T1 and T2:
T1: checks cls._instance is None -> True (instance not yet created)
T2: checks cls._instance is None -> True (instance not yet created)
T1: acquires lock
T1: calls super().__new__(cls) -> creates instance
T1: releases lock
T2: acquires lock
T2: STILL creates a second instance <- BUG without inner check
With the inner check, T2 sees the instance T1 just created and returns it.
class DatabasePool(Singleton):
def __init__(self):
# Guard against re-initialization on subsequent calls
if hasattr(self, "_initialized"):
return
self._initialized = True
print("Creating pool - opening 10 connections...")
self._connections: list[dict] = [
{"id": i, "status": "open"} for i in range(10)
]
def get_connection(self) -> dict:
return next(c for c in self._connections if c["status"] == "open")
def release_connection(self, conn: dict) -> None:
conn["status"] = "open"
# Usage
pool1 = DatabasePool()
pool2 = DatabasePool()
assert pool1 is pool2 # True - same object
The _initialized guard is essential. Without it, every call to DatabasePool() re-runs __init__ on the same instance, resetting its state.
Solution 2: Metaclass Singleton
A metaclass approach is cleaner for libraries because users do not need to inherit from Singleton:
import threading
class SingletonMeta(type):
"""Metaclass that makes any class a thread-safe Singleton."""
_instances: dict[type, object] = {}
_lock: threading.Lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class Logger(metaclass=SingletonMeta):
def __init__(self, name: str = "app"):
self.name = name
self._entries: list[str] = []
def log(self, message: str) -> None:
self._entries.append(message)
print(f"[{self.name}] {message}")
def get_entries(self) -> list[str]:
return list(self._entries)
# Verify singleton behavior
logger_a = Logger("service-a")
logger_b = Logger("service-b") # init args ignored for subsequent calls
assert logger_a is logger_b # True
assert logger_a.name == "service-a" # "service-b" was ignored
The metaclass approach intercepts the class call (Logger()) before __new__ and __init__ run, making the guard cleaner than the _initialized attribute approach.
Solution 3: The Borg Pattern (Shared State, Not Shared Identity)
The Borg pattern is a Python-specific alternative that shares state instead of identity. All instances share the same __dict__:
class Borg:
"""All instances share the same state (but are different objects)."""
_shared_state: dict = {}
def __init__(self):
self.__dict__ = self._shared_state
class AppConfig(Borg):
def __init__(self):
super().__init__()
# First call: _shared_state is empty, so we initialize
if not self._shared_state:
self.debug = False
self.db_url = "postgresql://localhost/app"
self.max_connections = 10
def update(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
config1 = AppConfig()
config2 = AppConfig()
assert config1 is not config2 # Different objects (unlike Singleton)
assert config1.debug == config2.debug # True - shared state
config1.update(debug=True)
print(config2.debug) # True - config2 sees config1's change
Borg vs Singleton:
| Aspect | Singleton | Borg |
|---|---|---|
| Object identity | All calls return the same object | Different objects, same __dict__ |
| Subclassing | Subclass creates a new singleton | Subclass shares the parent's state (dangerous) |
| Testing | Reset by cls._instance = None | Reset by cls._shared_state.clear() |
| Transparency | is check reveals singleton | Looks like normal instances |
| Use case | Resource managers, pools | Configuration objects |
Solution 4: The Pythonic Way - Module-Level Singleton
For most cases in production Python, skip the metaclass entirely. A module is a singleton:
# config.py
import os
from dataclasses import dataclass, field
from functools import cached_property
@dataclass
class _AppConfig:
"""Private config class - do not instantiate directly."""
db_url: str = field(default_factory=lambda: os.environ["DATABASE_URL"])
redis_url: str = field(
default_factory=lambda: os.environ.get("REDIS_URL", "redis://localhost")
)
debug: bool = field(
default_factory=lambda: os.environ.get("DEBUG", "").lower() == "true"
)
max_db_connections: int = field(
default_factory=lambda: int(os.environ.get("MAX_DB_CONN", "10"))
)
@cached_property
def is_production(self) -> bool:
return not self.debug
# This is the singleton - created once when config.py is first imported
config = _AppConfig()
# Any module that needs config
from config import config
def get_user(user_id: str) -> dict:
conn = get_db_connection(config.db_url)
...
sys.modules guarantees config.py is imported once. The config object is created once. This is simpler, more readable, and easier to test (just mock the config object).
Why Naive Singletons Break with Subclasses
# Dangerous - naive __new__ singleton breaks with inheritance
class Base(Singleton):
pass
class Child(Base):
pass
b = Base()
c = Child()
# What is c?
print(type(c)) # <class 'Base'> - NOT Child!
Because _instance is stored on Base, and Child inherits _instance, the if cls._instance is None check for Child finds Base's instance and returns it. The metaclass approach avoids this because it uses cls as the dictionary key:
class Child(metaclass=SingletonMeta):
pass
class Parent(metaclass=SingletonMeta):
pass
assert Child() is Child() # True - Child singleton
assert Parent() is Parent() # True - Parent singleton
assert Child() is not Parent() # True - different singletons
Part 2 - Factory Method
The Problem: Type-Dependent Creation Scattered Everywhere
# PROBLEM: Parser creation logic duplicated across the codebase
# In data_loader.py
def load_dataset(filepath: str):
if filepath.endswith(".json"):
import json
with open(filepath) as f:
return json.load(f)
elif filepath.endswith(".yaml") or filepath.endswith(".yml"):
import yaml
with open(filepath) as f:
return yaml.safe_load(f)
elif filepath.endswith(".toml"):
import tomllib
with open(filepath, "rb") as f:
return tomllib.load(f)
else:
raise ValueError(f"Unsupported format: {filepath}")
# In config_reader.py - same logic duplicated
def read_config(filepath: str):
if filepath.endswith(".json"):
... # same code again
# In schema_validator.py - duplicated a third time
def validate_from_file(filepath: str):
if filepath.endswith(".json"):
... # same code a third time
When TOML support is added, you fix it in three places. When CSV is added, four. This is the Open/Closed violation: adding a format requires modifying existing code.
The Factory Method Pattern
The Factory Method pattern defines an interface for creating an object but lets subclasses decide which class to instantiate. In Python, "subclasses" often means concrete implementors of an ABC.
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
import json
# --- Abstract base: the "Creator" in GoF terms ---
class DataParser(ABC):
"""Abstract parser - defines the interface and the factory method."""
@abstractmethod
def parse(self, content: str | bytes) -> Any:
"""Parse raw content into a Python object."""
...
@abstractmethod
def supported_extensions(self) -> tuple[str, ...]:
"""Return file extensions this parser handles."""
...
def load_file(self, filepath: str | Path) -> Any:
"""Template method: read file and delegate parsing to subclass."""
filepath = Path(filepath)
mode = "rb" if self._needs_binary() else "r"
with open(filepath, mode) as f:
content = f.read()
return self.parse(content)
def _needs_binary(self) -> bool:
"""Override in subclasses that require binary mode."""
return False
# --- Concrete parsers ---
class JSONParser(DataParser):
def parse(self, content: str | bytes) -> Any:
return json.loads(content)
def supported_extensions(self) -> tuple[str, ...]:
return (".json",)
class YAMLParser(DataParser):
def parse(self, content: str | bytes) -> Any:
import yaml
return yaml.safe_load(content)
def supported_extensions(self) -> tuple[str, ...]:
return (".yaml", ".yml")
class TOMLParser(DataParser):
def parse(self, content: str | bytes) -> Any:
import tomllib
text = content.decode() if isinstance(content, bytes) else content
return tomllib.loads(text)
def supported_extensions(self) -> tuple[str, ...]:
return (".toml",)
def _needs_binary(self) -> bool:
return True
class CSVParser(DataParser):
def __init__(self, delimiter: str = ","):
self._delimiter = delimiter
def parse(self, content: str | bytes) -> list[dict]:
import csv
import io
if isinstance(content, bytes):
content = content.decode()
reader = csv.DictReader(io.StringIO(content), delimiter=self._delimiter)
return list(reader)
def supported_extensions(self) -> tuple[str, ...]:
return (".csv", ".tsv")
# --- The factory ---
class ParserFactory:
"""Creates the correct parser for a given file extension."""
_parsers: list[DataParser] = [
JSONParser(),
YAMLParser(),
TOMLParser(),
CSVParser(),
]
@classmethod
def register(cls, parser: DataParser) -> None:
"""Register a new parser - Open/Closed: extend without modifying."""
cls._parsers.append(parser)
@classmethod
def for_file(cls, filepath: str | Path) -> DataParser:
"""Return the appropriate parser for the given file path."""
suffix = Path(filepath).suffix.lower()
for parser in cls._parsers:
if suffix in parser.supported_extensions():
return parser
supported = [e for p in cls._parsers for e in p.supported_extensions()]
raise ValueError(
f"No parser registered for extension '{suffix}'. "
f"Supported: {supported}"
)
@classmethod
def load(cls, filepath: str | Path) -> Any:
"""Convenience method: get parser and load in one call."""
return cls.for_file(filepath).load_file(filepath)
Usage is now uniform:
# data_loader.py - no format-specific logic
def load_dataset(filepath: str) -> Any:
return ParserFactory.load(filepath)
# Adding Parquet support - zero changes to existing code
class ParquetParser(DataParser):
def parse(self, content: bytes) -> Any:
import pandas as pd
import io
return pd.read_parquet(io.BytesIO(content))
def supported_extensions(self) -> tuple[str, ...]:
return (".parquet",)
def _needs_binary(self) -> bool:
return True
ParserFactory.register(ParquetParser())
# Now load_dataset("data.parquet") works everywhere
Pythonic Variant: Dictionary of Callables
For simpler cases where construction is trivial, a dictionary is the factory:
import json
import yaml
import tomllib
from typing import Callable, Any
_PARSERS: dict[str, Callable[[str | bytes], Any]] = {
".json": json.loads,
".yaml": yaml.safe_load,
".yml": yaml.safe_load,
".toml": lambda b: tomllib.loads(b.decode() if isinstance(b, bytes) else b),
}
def get_parser(extension: str) -> Callable[[str | bytes], Any]:
if extension not in _PARSERS:
raise ValueError(f"No parser for '{extension}'")
return _PARSERS[extension]
def register_parser(extension: str, parser: Callable) -> None:
_PARSERS[extension] = parser
Use the ABC approach when parsers have complex state or multiple methods. Use the dictionary approach when the factory creates simple, stateless callables.
Part 3 - Abstract Factory
The Problem: Inconsistent Families of Objects
# PROBLEM: AWS and GCP clients mixed accidentally
class DataPipeline:
def __init__(self):
# Mixed providers - accident waiting to happen
self.storage = AWSStorage() # AWS S3
self.compute = GCPCompute() # Google Cloud Run
self.queue = AWSQueue() # AWS SQS
def process(self, data):
# Storage and queue are AWS, compute is GCP
# Credentials, regions, error formats all differ
# This will break in non-obvious ways
...
The Abstract Factory pattern creates families of related objects. All objects in a family are consistent with each other. You cannot mix an AWS storage adapter with a GCP compute adapter when they are created through the same factory.
The Abstract Factory Pattern
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterator
# --- Abstract Products ---
class StorageBackend(ABC):
@abstractmethod
def upload(self, key: str, data: bytes) -> str:
"""Upload data, return URI."""
...
@abstractmethod
def download(self, key: str) -> bytes:
...
@abstractmethod
def list_objects(self, prefix: str) -> Iterator[str]:
...
class ComputeBackend(ABC):
@abstractmethod
def submit_job(self, image: str, command: list[str], env: dict[str, str]) -> str:
"""Submit a job, return job ID."""
...
@abstractmethod
def get_job_status(self, job_id: str) -> str:
"""Return 'pending', 'running', 'completed', or 'failed'."""
...
class QueueBackend(ABC):
@abstractmethod
def publish(self, topic: str, message: dict) -> None:
...
@abstractmethod
def consume(self, topic: str, max_messages: int = 10) -> list[dict]:
...
# --- Abstract Factory ---
class CloudProviderFactory(ABC):
"""Creates a family of related cloud backends."""
@abstractmethod
def create_storage(self) -> StorageBackend:
...
@abstractmethod
def create_compute(self) -> ComputeBackend:
...
@abstractmethod
def create_queue(self) -> QueueBackend:
...
# --- Concrete Products: AWS ---
class AWSS3Storage(StorageBackend):
def __init__(self, bucket: str, region: str):
self._bucket = bucket
self._region = region
def upload(self, key: str, data: bytes) -> str:
# boto3: self._client.put_object(Bucket=self._bucket, Key=key, Body=data)
return f"s3://{self._bucket}/{key}"
def download(self, key: str) -> bytes:
# boto3: response = self._client.get_object(Bucket=self._bucket, Key=key)
return b""
def list_objects(self, prefix: str) -> Iterator[str]:
# boto3: paginator = self._client.get_paginator("list_objects_v2")
return iter([])
class AWSFargateCompute(ComputeBackend):
def __init__(self, cluster: str, task_definition: str, region: str):
self._cluster = cluster
self._task_def = task_definition
self._region = region
def submit_job(self, image: str, command: list[str], env: dict[str, str]) -> str:
# boto3: ECS RunTask call
return f"ecs-task-{hash(image)}"
def get_job_status(self, job_id: str) -> str:
return "running"
class AWSSQSQueue(QueueBackend):
def __init__(self, queue_url: str):
self._queue_url = queue_url
def publish(self, topic: str, message: dict) -> None:
import json
# boto3: self._client.send_message(QueueUrl=self._queue_url, MessageBody=json.dumps(message))
pass
def consume(self, topic: str, max_messages: int = 10) -> list[dict]:
return []
# --- Concrete Factory: AWS ---
@dataclass
class AWSConfig:
region: str
s3_bucket: str
ecs_cluster: str
ecs_task_definition: str
sqs_queue_url: str
class AWSFactory(CloudProviderFactory):
def __init__(self, config: AWSConfig):
self._config = config
def create_storage(self) -> StorageBackend:
return AWSS3Storage(self._config.s3_bucket, self._config.region)
def create_compute(self) -> ComputeBackend:
return AWSFargateCompute(
self._config.ecs_cluster,
self._config.ecs_task_definition,
self._config.region,
)
def create_queue(self) -> QueueBackend:
return AWSSQSQueue(self._config.sqs_queue_url)
# --- Concrete Products: GCP ---
class GCSStorage(StorageBackend):
def __init__(self, bucket: str, project: str):
self._bucket = bucket
self._project = project
def upload(self, key: str, data: bytes) -> str:
return f"gs://{self._bucket}/{key}"
def download(self, key: str) -> bytes:
return b""
def list_objects(self, prefix: str) -> Iterator[str]:
return iter([])
class GCPCloudRunCompute(ComputeBackend):
def __init__(self, project: str, region: str):
self._project = project
self._region = region
def submit_job(self, image: str, command: list[str], env: dict[str, str]) -> str:
return f"cloudrun-job-{hash(image)}"
def get_job_status(self, job_id: str) -> str:
return "running"
class GCPPubSubQueue(QueueBackend):
def __init__(self, project: str):
self._project = project
def publish(self, topic: str, message: dict) -> None:
pass
def consume(self, topic: str, max_messages: int = 10) -> list[dict]:
return []
# --- Concrete Factory: GCP ---
@dataclass
class GCPConfig:
project: str
region: str
gcs_bucket: str
class GCPFactory(CloudProviderFactory):
def __init__(self, config: GCPConfig):
self._config = config
def create_storage(self) -> StorageBackend:
return GCSStorage(self._config.gcs_bucket, self._config.project)
def create_compute(self) -> ComputeBackend:
return GCPCloudRunCompute(self._config.project, self._config.region)
def create_queue(self) -> QueueBackend:
return GCPPubSubQueue(self._config.project)
Application Code That Does Not Know the Cloud Provider
# data_pipeline.py - zero AWS/GCP imports, zero provider-specific logic
class DataPipeline:
"""Processes data using cloud backends injected at construction."""
def __init__(self, factory: CloudProviderFactory):
# All three backends are from the SAME provider - guaranteed consistent
self.storage = factory.create_storage()
self.compute = factory.create_compute()
self.queue = factory.create_queue()
def process_batch(self, input_key: str, output_key: str) -> str:
# Download input
data = self.storage.download(input_key)
# Submit compute job
job_id = self.compute.submit_job(
image="data-processor:latest",
command=["python", "process.py", "--input", input_key],
env={"OUTPUT_KEY": output_key},
)
# Notify downstream
self.queue.publish(
topic="pipeline-events",
message={"event": "job_submitted", "job_id": job_id, "input": input_key},
)
return job_id
Switching from AWS to GCP is one line at the composition root:
import os
def build_factory() -> CloudProviderFactory:
provider = os.environ.get("CLOUD_PROVIDER", "aws")
if provider == "aws":
return AWSFactory(AWSConfig(
region=os.environ["AWS_REGION"],
s3_bucket=os.environ["S3_BUCKET"],
ecs_cluster=os.environ["ECS_CLUSTER"],
ecs_task_definition=os.environ["ECS_TASK_DEF"],
sqs_queue_url=os.environ["SQS_QUEUE_URL"],
))
if provider == "gcp":
return GCPFactory(GCPConfig(
project=os.environ["GCP_PROJECT"],
region=os.environ["GCP_REGION"],
gcs_bucket=os.environ["GCS_BUCKET"],
))
raise ValueError(f"Unknown provider: {provider}")
# In tests - no real cloud needed
class InMemoryStorage(StorageBackend):
def __init__(self):
self._store: dict[str, bytes] = {}
def upload(self, key: str, data: bytes) -> str:
self._store[key] = data
return f"memory://{key}"
def download(self, key: str) -> bytes:
return self._store[key]
def list_objects(self, prefix: str) -> Iterator[str]:
return (k for k in self._store if k.startswith(prefix))
class MockCompute(ComputeBackend):
def __init__(self):
self.submitted_jobs: list[dict] = []
def submit_job(self, image: str, command: list[str], env: dict[str, str]) -> str:
job_id = f"mock-job-{len(self.submitted_jobs)}"
self.submitted_jobs.append({"id": job_id, "image": image, "command": command})
return job_id
def get_job_status(self, job_id: str) -> str:
return "completed"
class MockQueue(QueueBackend):
def __init__(self):
self.published: list[dict] = []
def publish(self, topic: str, message: dict) -> None:
self.published.append({"topic": topic, "message": message})
def consume(self, topic: str, max_messages: int = 10) -> list[dict]:
return [m for m in self.published if m["topic"] == topic]
class InMemoryFactory(CloudProviderFactory):
def create_storage(self) -> StorageBackend:
return InMemoryStorage()
def create_compute(self) -> ComputeBackend:
return MockCompute()
def create_queue(self) -> QueueBackend:
return MockQueue()
# Test
def test_pipeline_publishes_event_after_job_submission():
factory = InMemoryFactory()
queue = factory.create_queue()
pipeline = DataPipeline(factory)
pipeline.process_batch("input/data.parquet", "output/result.parquet")
assert len(queue.published) == 1
assert queue.published[0]["topic"] == "pipeline-events"
assert "job_submitted" in queue.published[0]["message"]["event"]
Abstract Factory vs Factory Method
| Aspect | Factory Method | Abstract Factory |
|---|---|---|
| Creates | One product | A family of related products |
| Extension point | Override the factory method in a subclass | Provide a new factory class |
| Use when | You have one type of object to create | You have groups of related objects |
| Example | Parser for one file format | All cloud backends for one provider |
Part 4 - Builder
The Problem: Constructor Explosion
# PROBLEM: A model training config with 20 parameters
class TrainingConfig:
def __init__(
self,
model_name,
learning_rate=0.001,
batch_size=32,
epochs=10,
optimizer="adam",
loss_function="categorical_crossentropy",
dropout_rate=0.2,
weight_decay=1e-5,
warmup_steps=500,
lr_scheduler="cosine",
gradient_clip=1.0,
mixed_precision=False,
device="cuda",
num_workers=4,
checkpoint_every=1,
early_stopping_patience=5,
validation_split=0.1,
seed=42,
log_dir="./logs",
output_dir="./outputs",
):
...
# Calling this is a nightmare:
config = TrainingConfig(
"bert-base-uncased",
0.0003,
16,
20,
"adamw",
"cross_entropy",
0.1,
1e-4,
1000,
"linear",
0.5,
True,
"cuda",
8,
2,
3,
0.15,
123,
"./experiment_logs",
"./experiment_outputs",
)
# What is 0.1 here? What is 0.5? What is 123?
# Nobody knows without reading the constructor signature.
The Builder pattern separates the construction of a complex object from its representation. It allows you to produce different configurations using the same construction process, and makes the construction readable.
The Builder Pattern: Fluent API
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
import copy
# --- The Product ---
@dataclass
class TrainingConfig:
"""Immutable training configuration."""
model_name: str
learning_rate: float = 3e-4
batch_size: int = 32
epochs: int = 10
optimizer: Literal["adam", "adamw", "sgd", "rmsprop"] = "adamw"
loss_function: str = "cross_entropy"
dropout_rate: float = 0.1
weight_decay: float = 1e-4
warmup_steps: int = 1000
lr_scheduler: Literal["cosine", "linear", "constant", "polynomial"] = "cosine"
gradient_clip: float = 1.0
mixed_precision: bool = True
device: str = "cuda"
num_workers: int = 4
checkpoint_every: int = 1
early_stopping_patience: int = 5
validation_split: float = 0.1
seed: int = 42
log_dir: str = "./logs"
output_dir: str = "./outputs"
tags: list[str] = field(default_factory=list)
def __post_init__(self):
if not 0 < self.learning_rate < 1:
raise ValueError(
f"learning_rate must be in (0, 1), got {self.learning_rate}"
)
if self.batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {self.batch_size}")
if not 0 <= self.validation_split < 1:
raise ValueError(
f"validation_split must be in [0, 1), got {self.validation_split}"
)
# --- The Builder ---
class TrainingConfigBuilder:
"""Fluent builder for TrainingConfig."""
def __init__(self, model_name: str):
self._params: dict = {"model_name": model_name}
# --- Learning rate and optimization ---
def with_learning_rate(self, lr: float) -> TrainingConfigBuilder:
self._params["learning_rate"] = lr
return self
def with_optimizer(
self,
optimizer: Literal["adam", "adamw", "sgd", "rmsprop"],
weight_decay: float = 1e-4,
) -> TrainingConfigBuilder:
self._params["optimizer"] = optimizer
self._params["weight_decay"] = weight_decay
return self
def with_lr_schedule(
self,
scheduler: Literal["cosine", "linear", "constant", "polynomial"],
warmup_steps: int = 1000,
) -> TrainingConfigBuilder:
self._params["lr_scheduler"] = scheduler
self._params["warmup_steps"] = warmup_steps
return self
# --- Training loop ---
def for_epochs(
self, epochs: int, early_stopping_patience: int = 5
) -> TrainingConfigBuilder:
self._params["epochs"] = epochs
self._params["early_stopping_patience"] = early_stopping_patience
return self
def with_batch_size(self, batch_size: int) -> TrainingConfigBuilder:
self._params["batch_size"] = batch_size
return self
def with_mixed_precision(self, enabled: bool = True) -> TrainingConfigBuilder:
self._params["mixed_precision"] = enabled
return self
def with_gradient_clipping(self, max_norm: float) -> TrainingConfigBuilder:
self._params["gradient_clip"] = max_norm
return self
# --- Regularization ---
def with_dropout(self, rate: float) -> TrainingConfigBuilder:
self._params["dropout_rate"] = rate
return self
# --- Infrastructure ---
def on_device(self, device: str) -> TrainingConfigBuilder:
self._params["device"] = device
return self
def with_num_workers(self, workers: int) -> TrainingConfigBuilder:
self._params["num_workers"] = workers
return self
def saving_to(
self,
output_dir: str,
log_dir: str = "./logs",
checkpoint_every: int = 1,
) -> TrainingConfigBuilder:
self._params["output_dir"] = output_dir
self._params["log_dir"] = log_dir
self._params["checkpoint_every"] = checkpoint_every
return self
def with_tags(self, *tags: str) -> TrainingConfigBuilder:
self._params.setdefault("tags", []).extend(tags)
return self
def with_seed(self, seed: int) -> TrainingConfigBuilder:
self._params["seed"] = seed
return self
# --- Build ---
def build(self) -> TrainingConfig:
"""Construct and validate the TrainingConfig."""
return TrainingConfig(**self._params)
def build_for_debugging(self) -> TrainingConfig:
"""Build a fast config for smoke-testing the pipeline."""
debug_params = copy.copy(self._params)
debug_params.update({
"epochs": 1,
"batch_size": 4,
"mixed_precision": False,
"device": "cpu",
"num_workers": 0,
"early_stopping_patience": 1,
})
return TrainingConfig(**debug_params)
Now construction is readable, searchable, and self-documenting:
# Production config - reads like English
production_config = (
TrainingConfigBuilder("bert-base-uncased")
.with_learning_rate(3e-4)
.with_optimizer("adamw", weight_decay=1e-4)
.with_lr_schedule("cosine", warmup_steps=1000)
.for_epochs(20, early_stopping_patience=3)
.with_batch_size(16)
.with_mixed_precision(True)
.with_dropout(0.1)
.on_device("cuda")
.saving_to("./outputs/bert-run-001", checkpoint_every=2)
.with_tags("bert", "finetuning", "squad")
.with_seed(42)
.build()
)
# Quick debug config - reuse the same builder shape
debug_config = (
TrainingConfigBuilder("bert-base-uncased")
.with_learning_rate(3e-4)
.with_optimizer("adamw")
.build_for_debugging() # overrides to fast settings
)
Director: Encapsulating Standard Configurations
class TrainingConfigDirector:
"""Encapsulates common build recipes."""
@staticmethod
def fine_tuning_recipe(model_name: str, output_dir: str) -> TrainingConfig:
return (
TrainingConfigBuilder(model_name)
.with_learning_rate(2e-5) # small LR for fine-tuning
.with_optimizer("adamw", weight_decay=0.01)
.with_lr_schedule("linear", warmup_steps=500)
.for_epochs(3) # typically 3-5 epochs for fine-tuning
.with_batch_size(32)
.with_mixed_precision(True)
.saving_to(output_dir)
.build()
)
@staticmethod
def pretraining_recipe(model_name: str, output_dir: str) -> TrainingConfig:
return (
TrainingConfigBuilder(model_name)
.with_learning_rate(1e-4)
.with_optimizer("adamw", weight_decay=0.1)
.with_lr_schedule("cosine", warmup_steps=10000)
.for_epochs(100, early_stopping_patience=10)
.with_batch_size(256)
.with_mixed_precision(True)
.with_gradient_clipping(1.0)
.saving_to(output_dir, checkpoint_every=5)
.build()
)
# Usage
config = TrainingConfigDirector.fine_tuning_recipe(
model_name="distilbert-base-uncased",
output_dir="./outputs/distilbert-ft",
)
Builder + Dataclass __post_init__
Python dataclasses complement the Builder pattern well. The Builder constructs the arguments; the dataclass __post_init__ validates them. This keeps validation logic in the product class, not scattered in the builder methods:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
@dataclass
class HTTPRequestConfig:
url: str
method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = "GET"
headers: dict[str, str] = field(default_factory=dict)
body: bytes | None = None
timeout_seconds: float = 30.0
retries: int = 3
retry_backoff: float = 0.5
follow_redirects: bool = True
def __post_init__(self):
if not self.url.startswith(("http://", "https://")):
raise ValueError(
f"URL must start with http:// or https://: {self.url}"
)
if self.method in ("GET", "DELETE") and self.body is not None:
raise ValueError(f"{self.method} requests should not have a body")
if self.timeout_seconds <= 0:
raise ValueError("timeout_seconds must be positive")
if self.retries < 0:
raise ValueError("retries cannot be negative")
class HTTPRequestBuilder:
def __init__(self, url: str):
self._url = url
self._method = "GET"
self._headers: dict[str, str] = {}
self._body: bytes | None = None
self._timeout = 30.0
self._retries = 3
self._retry_backoff = 0.5
def post(self, body: dict | bytes | str) -> HTTPRequestBuilder:
self._method = "POST"
if isinstance(body, dict):
import json
self._body = json.dumps(body).encode()
self._headers.setdefault("Content-Type", "application/json")
elif isinstance(body, str):
self._body = body.encode()
else:
self._body = body
return self
def put(self, body: dict | bytes | str) -> HTTPRequestBuilder:
self._method = "PUT"
return self.post(body) # reuse body handling logic
def with_header(self, key: str, value: str) -> HTTPRequestBuilder:
self._headers[key] = value
return self
def with_bearer_token(self, token: str) -> HTTPRequestBuilder:
self._headers["Authorization"] = f"Bearer {token}"
return self
def with_timeout(self, seconds: float) -> HTTPRequestBuilder:
self._timeout = seconds
return self
def with_retries(self, count: int, backoff: float = 0.5) -> HTTPRequestBuilder:
self._retries = count
self._retry_backoff = backoff
return self
def build(self) -> HTTPRequestConfig:
return HTTPRequestConfig(
url=self._url,
method=self._method,
headers=self._headers,
body=self._body,
timeout_seconds=self._timeout,
retries=self._retries,
retry_backoff=self._retry_backoff,
)
# Usage
request = (
HTTPRequestBuilder("https://api.example.com/users")
.post({"name": "Alice", "role": "admin"})
.with_bearer_token("eyJhbGci...")
.with_timeout(10.0)
.with_retries(3, backoff=1.0)
.build()
)
Part 5 - Prototype
The Problem: Expensive Object Creation
# PROBLEM: Creating neural network layer configs by hand, repeatedly
layer1 = {
"type": "transformer",
"hidden_size": 768,
"num_attention_heads": 12,
"intermediate_size": 3072,
"hidden_dropout_prob": 0.1,
"attention_probs_dropout_prob": 0.1,
"activation_fn": "gelu",
"layer_norm_eps": 1e-12,
"initializer_range": 0.02,
}
# For a 12-layer model, copy-paste this dict 12 times.
# Need to change dropout for layer 10? Update 3 dicts manually.
# Need a variant with different hidden size? Copy all 12 again.
The Prototype pattern creates new objects by copying an existing object (the prototype). It is the correct pattern when:
- Object creation is expensive (loaded from disk, computed from scratch)
- You need many objects that differ in only a few attributes
- You want to avoid the complexity of a class hierarchy for variants
copy.copy vs copy.deepcopy
Before implementing Prototype, understand the difference:
import copy
# Shallow copy: copies the object, but nested objects are shared
original = {"layers": [{"size": 768}, {"size": 768}]}
shallow = copy.copy(original)
shallow["layers"][0]["size"] = 512
print(original["layers"][0]["size"]) # 512 - original was mutated!
# Deep copy: recursively copies all nested objects
original = {"layers": [{"size": 768}, {"size": 768}]}
deep = copy.deepcopy(original)
deep["layers"][0]["size"] = 256
print(original["layers"][0]["size"]) # 768 - original unchanged
The rule: use copy.copy when the object has no mutable nested state you want to isolate. Use copy.deepcopy when nested structures must be independent.
The Prototype Pattern with __copy__ and __deepcopy__
import copy
from dataclasses import dataclass, field
from typing import Any
@dataclass
class LayerConfig:
"""Configuration for a single transformer layer."""
layer_type: str = "transformer"
hidden_size: int = 768
num_attention_heads: int = 12
intermediate_size: int = 3072
hidden_dropout_prob: float = 0.1
attention_probs_dropout_prob: float = 0.1
activation_fn: str = "gelu"
layer_norm_eps: float = 1e-12
initializer_range: float = 0.02
# Mutable: per-layer metadata tags
metadata: dict[str, Any] = field(default_factory=dict)
def __copy__(self) -> "LayerConfig":
"""Shallow copy: shares the metadata dict."""
cls = self.__class__
new = cls.__new__(cls)
new.__dict__.update(self.__dict__)
return new
def __deepcopy__(self, memo: dict) -> "LayerConfig":
"""Deep copy: independent metadata dict."""
cls = self.__class__
new = cls.__new__(cls)
memo[id(self)] = new
for k, v in self.__dict__.items():
setattr(new, k, copy.deepcopy(v, memo))
return new
def with_overrides(self, **kwargs) -> "LayerConfig":
"""Return a deep-copied variant with specific fields changed."""
clone = copy.deepcopy(self)
for key, value in kwargs.items():
if not hasattr(clone, key):
raise AttributeError(f"LayerConfig has no attribute '{key}'")
setattr(clone, key, value)
return clone
@dataclass
class ModelConfig:
"""Prototype-based model configuration."""
model_name: str
num_layers: int
layer_configs: list[LayerConfig] = field(default_factory=list)
vocab_size: int = 30522
max_position_embeddings: int = 512
def __post_init__(self):
if not self.layer_configs:
# All layers start as deep copies of a base config
base = LayerConfig(hidden_size=768, num_attention_heads=12)
self.layer_configs = [copy.deepcopy(base) for _ in range(self.num_layers)]
def __deepcopy__(self, memo: dict) -> "ModelConfig":
cls = self.__class__
new = cls.__new__(cls)
memo[id(self)] = new
new.model_name = self.model_name
new.num_layers = self.num_layers
new.vocab_size = self.vocab_size
new.max_position_embeddings = self.max_position_embeddings
new.layer_configs = [copy.deepcopy(lc, memo) for lc in self.layer_configs]
return new
def clone_with_dropout(self, dropout: float) -> "ModelConfig":
"""Clone this config with a different dropout rate on all layers."""
new_config = copy.deepcopy(self)
for layer in new_config.layer_configs:
layer.hidden_dropout_prob = dropout
layer.attention_probs_dropout_prob = dropout
return new_config
def clone_with_stochastic_depth(self, max_dropout: float) -> "ModelConfig":
"""Clone with linearly increasing dropout per layer (stochastic depth)."""
new_config = copy.deepcopy(self)
n = max(len(new_config.layer_configs) - 1, 1)
for i, layer in enumerate(new_config.layer_configs):
rate = round((i / n) * max_dropout, 4)
layer.hidden_dropout_prob = rate
return new_config
The Prototype Registry
A prototype registry stores pre-built prototypes and clones them on demand:
class ModelConfigRegistry:
"""Registry of pre-built model configuration prototypes."""
_prototypes: dict[str, ModelConfig] = {}
@classmethod
def register(cls, name: str, config: ModelConfig) -> None:
cls._prototypes[name] = config
@classmethod
def clone(cls, name: str, **overrides) -> ModelConfig:
"""Clone a registered prototype, applying field overrides."""
if name not in cls._prototypes:
available = list(cls._prototypes.keys())
raise KeyError(f"No prototype '{name}'. Available: {available}")
proto = copy.deepcopy(cls._prototypes[name])
for key, value in overrides.items():
setattr(proto, key, value)
return proto
@classmethod
def list_available(cls) -> list[str]:
return list(cls._prototypes.keys())
# Register the base prototypes once (expensive initialization done once)
ModelConfigRegistry.register(
"bert-base",
ModelConfig(model_name="bert-base", num_layers=12),
)
ModelConfigRegistry.register(
"bert-large",
ModelConfig(
model_name="bert-large",
num_layers=24,
layer_configs=[
LayerConfig(
hidden_size=1024,
num_attention_heads=16,
intermediate_size=4096,
)
for _ in range(24)
],
),
)
# Clone and customize cheaply - no re-initialization
high_dropout = ModelConfigRegistry.clone(
"bert-base", model_name="bert-base-high-dropout"
)
high_dropout = high_dropout.clone_with_dropout(0.2)
stochastic = ModelConfigRegistry.clone(
"bert-large", model_name="bert-large-sd"
)
stochastic = stochastic.clone_with_stochastic_depth(max_dropout=0.3)
# Both configs are fully independent - modifying one does not affect the registry
assert ModelConfigRegistry.clone("bert-base").layer_configs[0].hidden_dropout_prob == 0.1
Custom __deepcopy__ for Performance
The default copy.deepcopy uses Python's pickle protocol internally for complex objects, which is correct but can be slow for deeply nested structures. Implementing __deepcopy__ directly lets you control the copy path:
import time
import copy
from dataclasses import dataclass
@dataclass
class HeavyConfig:
"""Config with flat nested dicts - custom copy is faster than pickle path."""
data: list[dict]
metadata: dict
def __deepcopy__(self, memo: dict) -> "HeavyConfig":
# Fast path: we know the structure - skip pickle overhead
return HeavyConfig(
data=[dict(d) for d in self.data], # list of flat dicts: simple copy
metadata=dict(self.metadata), # flat dict: simple copy
)
# For deeply nested structures with no circular refs,
# a custom __deepcopy__ is typically 3-10x faster.
large_config = HeavyConfig(
data=[{"key": f"val{i}", "size": i} for i in range(10000)],
metadata={"version": "1.0", "author": "team"},
)
t0 = time.perf_counter()
for _ in range(1000):
_ = copy.deepcopy(large_config)
elapsed = time.perf_counter() - t0
print(f"1000 deep copies: {elapsed:.3f}s")
Part 6 - Python-Specific Creational Patterns
functools.lru_cache as a Factory Registry
from functools import lru_cache
import importlib
@lru_cache(maxsize=None)
def get_model_class(model_name: str) -> type:
"""Load and cache model class by name - acts as a factory registry."""
module_path, class_name = model_name.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
# First call loads the module (expensive - disk I/O, C extension init)
# Subsequent calls return the cached class (free - dict lookup)
BERTModel = get_model_class("transformers.models.bert.modeling_bert.BertModel")
BERTModel2 = get_model_class("transformers.models.bert.modeling_bert.BertModel")
assert BERTModel is BERTModel2 # True - cached
Class Decorator as Factory
_HANDLER_REGISTRY: dict[str, type] = {}
def register_handler(event_type: str):
"""Class decorator that registers a handler class for an event type."""
def decorator(cls):
_HANDLER_REGISTRY[event_type] = cls
return cls
return decorator
@register_handler("user.created")
class UserCreatedHandler:
def handle(self, event: dict) -> None:
print(f"Sending welcome email to {event['email']}")
@register_handler("order.placed")
class OrderPlacedHandler:
def handle(self, event: dict) -> None:
print(f"Processing order {event['order_id']}")
@register_handler("payment.failed")
class PaymentFailedHandler:
def handle(self, event: dict) -> None:
print(f"Alerting team about failed payment {event['payment_id']}")
def dispatch_event(event: dict) -> None:
event_type = event["type"]
if event_type not in _HANDLER_REGISTRY:
raise ValueError(f"No handler for event type: {event_type}")
handler_class = _HANDLER_REGISTRY[event_type]
handler = handler_class()
handler.handle(event)
# Usage
dispatch_event({"type": "order.placed", "order_id": "ORD-001"})
Adding a new event handler is one decorator line. No factory class modification needed. This is the Open/Closed Principle in its most natural Python form.
__init_subclass__ for Auto-Registration
class BasePlugin:
"""Any subclass is automatically registered on class definition."""
_registry: dict[str, type] = {}
def __init_subclass__(cls, plugin_name: str = "", **kwargs):
super().__init_subclass__(**kwargs)
if plugin_name:
BasePlugin._registry[plugin_name] = cls
@classmethod
def create(cls, name: str, **kwargs) -> "BasePlugin":
if name not in cls._registry:
raise KeyError(
f"Plugin '{name}' not found. Available: {list(cls._registry)}"
)
return cls._registry[name](**kwargs)
class AudioPlugin(BasePlugin, plugin_name="audio"):
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def process(self, data: bytes) -> bytes:
return data # no-op placeholder
class VideoPlugin(BasePlugin, plugin_name="video"):
def __init__(self, fps: int = 30):
self.fps = fps
def process(self, data: bytes) -> bytes:
return data
# Create plugins by name - no direct import of concrete class needed
audio = BasePlugin.create("audio", sample_rate=48000)
video = BasePlugin.create("video", fps=60)
print(type(audio)) # <class '__main__.AudioPlugin'>
print(audio.sample_rate) # 48000
This pattern is used in Django's AppConfig, SQLAlchemy's mapper registry, and many Python plugin systems. The registration happens at class definition time, not at instantiation.
Creational Patterns: When to Use Which
| Pattern | Signal to Use | Signal to Avoid |
|---|---|---|
| Singleton | One resource manager per process (DB pool, config, logger) | When you think "I only need one now" - you are predicting the future incorrectly |
| Factory Method | Type of object determined at runtime from outside the class | When there is only one type and it will never change |
| Abstract Factory | Multiple related objects that must be consistent with each other | When objects do not form a natural family |
| Builder | Object has 5+ optional parameters, or step-by-step construction is meaningful | When the object has 2-3 simple required parameters |
| Prototype | Cloning is cheaper than creation, or you need many similar variants | When objects are simple to construct from scratch |
Interview Patterns
The following questions appear in senior Python engineering interviews. Each tests whether you understand the pattern conceptually, not just syntactically.
Q1: How would you make a thread-safe Singleton in Python? Walk me through two approaches.
Strong answer:
The first approach is double-checked locking with __new__:
import threading
class ThreadSafeSingleton:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None: # First check: fast, no lock overhead
with cls._lock:
if cls._instance is None: # Second check: inside lock
cls._instance = super().__new__(cls)
return cls._instance
The double-check is mandatory. Without the inner check, two threads can both pass the outer check before either acquires the lock, and both will create a new instance.
The second approach is a metaclass, which is cleaner for inheritance hierarchies:
class SingletonMeta(type):
_instances = {}
_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
The Pythonic approach for most cases: a module-level object. Python's sys.modules guarantees import-once semantics, making every module-level variable a natural singleton.
Follow-up: How do you reset a Singleton for testing?
# For __new__-based:
DatabasePool._instance = None
# For metaclass-based:
SingletonMeta._instances.pop(DatabasePool, None)
# For module-level, use mock.patch:
from unittest.mock import patch
with patch("myapp.config.config", MockConfig()):
run_tests()
Q2: What is the difference between Factory Method and Abstract Factory? Give a concrete example of when you would choose each.
Strong answer:
Factory Method creates one type of product. The "method" is a single factory function or class that is overridden in subclasses. Use it when you have a single object whose type varies by context. Example: a data loader that creates a JSONParser, YAMLParser, or TOMLParser depending on file extension.
Abstract Factory creates a family of related objects that must be used together consistently. Use it when you have multiple objects that must agree with each other. Example: a cloud infrastructure factory that creates Storage, Compute, and Queue objects all from the same provider (all AWS or all GCP). You cannot mix providers - AWS SQS authentication and error formats are incompatible with GCP Cloud Run in non-obvious ways.
The key diagnostic question: "Are the objects I create related to each other?" If yes - Abstract Factory. If each object stands alone - Factory Method.
Q3: When would you use the Builder pattern over just keyword arguments in Python?
Strong answer:
Python's keyword arguments with defaults handle simple cases well and should be the default choice:
def create_config(model="bert", lr=3e-4, epochs=10):
...
Reach for Builder when:
- Construction has multiple stages that belong to distinct conceptual groups (
with_optimizer,with_lr_schedule,saving_to) - The object requires cross-field validation that is hard to express in one
__init__body - You need multiple representations from the same construction process (
build()vsbuild_for_debugging()) - The parameter count exceeds 5-7 and grouping them into named builder methods significantly improves readability at call sites
- You want to enforce construction order (e.g., you must call
post()beforewith_bearer_token()makes semantic sense)
The biggest practical signal: if callers are using positional arguments and nobody can read the call site without the function signature open, it is time for a Builder.
Q4: Explain the difference between copy.copy and copy.deepcopy. When does using copy.copy on an object with a mutable nested attribute cause a bug?
Strong answer:
copy.copy creates a new object but does not recursively copy nested objects. The new object's mutable attributes point to the same underlying objects as the original. copy.deepcopy recursively copies all nested objects, so mutations to the copy do not affect the original.
The classic bug:
import copy
from dataclasses import dataclass, field
@dataclass
class Config:
name: str
hyperparams: dict # mutable nested object
original = Config(name="exp-1", hyperparams={"lr": 3e-4, "epochs": 10})
shallow = copy.copy(original)
# This looks like it only modifies 'shallow'
shallow.hyperparams["lr"] = 1e-3
# But it mutated 'original' too!
print(original.hyperparams["lr"]) # 1e-3 - original is corrupted
The fix: use copy.deepcopy when the prototype's mutable attributes must be independent, or implement __copy__ and __deepcopy__ to control exactly what is shared vs. copied.
Q5: What is the Borg pattern and when would you prefer it over a traditional Singleton?
Strong answer:
The Borg pattern makes all instances of a class share the same __dict__. Objects are not the same object (is returns False), but they have identical state:
class Borg:
_shared_state = {}
def __init__(self):
self.__dict__ = self._shared_state
Prefer Borg over Singleton when:
- You want subclasses to have their own shared state pools: each subclass can define its own
_shared_state = {}, giving each subclass an independent shared-state namespace. - "They all agree on state" matters more than "there is only one object": some systems tolerate multiple objects as long as they all see the same data.
- Testing convenience matters:
AppConfig._shared_state.clear()resets all instances between tests without needing to reach into_instance.
Prefer Singleton over Borg when:
- Object identity matters:
pool1 is pool2must beTruefor the design to be correct - You need to control construction arguments precisely: Borg runs
__init__on every instantiation, so controlling first-time-only initialization requires extra guards - The object manages an external resource (file handle, socket, connection pool) where having truly one object is a correctness requirement
In production Python, both are usually outperformed by the module-level singleton for configuration, and by dependency injection (passing the shared object explicitly) for everything else.
Summary
| Pattern | Core Idea | Python Idiom |
|---|---|---|
| Singleton | One instance per process | Module-level object; metaclass for complex cases |
| Factory Method | Delegate object creation to a method or subclass | Dictionary of callables; ABC with concrete subclasses |
| Abstract Factory | Create families of related objects consistently | Factory class with multiple create_* methods |
| Builder | Step-by-step construction via fluent API | Chained method calls returning self; dataclass + __post_init__ for validation |
| Prototype | Clone existing objects instead of creating from scratch | copy.deepcopy; custom __deepcopy__ for performance |
The common thread across all creational patterns: they decouple where an object is created from where it is used. This decoupling is what makes code testable - you can inject a mock factory, a test prototype, or a fake Singleton without touching the code under test.
Move on to Lesson 02: Structural Patterns.
